package ssm

import com.sun.prism.PixelFormat.DataType
import org.apache.parquet.format.IntType
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

import java.util.Properties

object UrlCount {
  def main(args:Array[String]): Unit ={
    val prop = new Properties()
    prop.put("user", "root")
    prop.put("password", "z9633352")
    prop.put("driver", "com.mysql.jdbc.Driver")
    val url = "jdbc:mysql://localhost:3306/python_db"
    while(true) {
      Thread.sleep(10000)
      println("开始更新搜索点击量")
      SECounts(prop, url)
    }
  }
  def SECounts(prop:Properties,url:String): Unit ={
    val spark = SparkSession.builder().appName("urlCount").master("local").getOrCreate()
    val sc = spark.sparkContext

    val SEFrame = spark.read.jdbc(url, "warehouse", prop).select("SE")
//    dataFrame.show(5)

    val SERdd = SEFrame.rdd.map(_.mkString(""))

    val SE = SERdd
      .map(_.split("[.]"))
      .map(x=>x(1))
      .map(_.replaceAll("baidu","百度"))
      .map(_.replaceAll("sogou","搜狗"))
      .map(_.replaceAll("bing","必应"))
      .map(_.replaceAll("yahoo","雅虎"))

    val seCount = SE.map((_,1))
      .reduceByKey(_+_)
      .sortBy(_._1)
      .map(x=>{Row(x._1,x._2)})
    seCount.foreach(println)

    val schema = StructType(Array(
      StructField("Se",StringType),
      StructField("counts",IntegerType)
    ))
    val detailDF = spark.createDataFrame(seCount,schema)
    println("正在存入数据库")
    detailDF.write.mode("Overwrite").jdbc(url,"se_counts",prop)
    println("数据库存入完成")

    sc.stop()
  }
}
